1   package org.apache.solr.core;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import com.google.common.collect.Lists;
21  import org.apache.solr.common.SolrException;
22  import org.apache.solr.logging.MDCLoggingContext;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import java.lang.invoke.MethodHandles;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.Collections;
30  import java.util.HashSet;
31  import java.util.LinkedHashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.TreeSet;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.TimeUnit;
38  
39  
40  class SolrCores {
41  
42    private static Object modifyLock = new Object(); // for locking around manipulating any of the core maps.
43    private final Map<String, SolrCore> cores = new LinkedHashMap<>(); // For "permanent" cores
44  
45    //WARNING! The _only_ place you put anything into the list of transient cores is with the putTransientCore method!
46    private Map<String, SolrCore> transientCores = new LinkedHashMap<>(); // For "lazily loaded" cores
47  
48    private final Map<String, CoreDescriptor> dynamicDescriptors = new LinkedHashMap<>();
49  
50    private final Map<String, SolrCore> createdCores = new LinkedHashMap<>();
51  
52    private final CoreContainer container;
53    
54    private Set<String> currentlyLoadingCores = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>());
55  
56    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
57  
58    // This map will hold objects that are being currently operated on. The core (value) may be null in the case of
59    // initial load. The rule is, never to any operation on a core that is currently being operated upon.
60    private static final Set<String> pendingCoreOps = new HashSet<>();
61  
62    // Due to the fact that closes happen potentially whenever anything is _added_ to the transient core list, we need
63    // to essentially queue them up to be handled via pendingCoreOps.
64    private static final List<SolrCore> pendingCloses = new ArrayList<>();
65  
66    SolrCores(CoreContainer container) {
67      this.container = container;
68    }
69  
70    // Trivial helper method for load, note it implements LRU on transient cores. Also note, if
71    // there is no setting for max size, nothing is done and all cores go in the regular "cores" list
72    protected void allocateLazyCores(final int cacheSize, final SolrResourceLoader loader) {
73      if (cacheSize != Integer.MAX_VALUE) {
74        log.info("Allocating transient cache for {} transient cores", cacheSize);
75        transientCores = new LinkedHashMap<String, SolrCore>(cacheSize, 0.75f, true) {
76          @Override
77          protected boolean removeEldestEntry(Map.Entry<String, SolrCore> eldest) {
78            if (size() > cacheSize) {
79              synchronized (modifyLock) {
80                SolrCore coreToClose = eldest.getValue();
81                log.info("Closing transient core [{}]", coreToClose.getName());
82                pendingCloses.add(coreToClose); // Essentially just queue this core up for closing.
83                modifyLock.notifyAll(); // Wakes up closer thread too
84              }
85              return true;
86            }
87            return false;
88          }
89        };
90      }
91    }
92  
93    protected void putDynamicDescriptor(String rawName, CoreDescriptor p) {
94      synchronized (modifyLock) {
95        dynamicDescriptors.put(rawName, p);
96      }
97    }
98  
99    // We are shutting down. You can't hold the lock on the various lists of cores while they shut down, so we need to
100   // make a temporary copy of the names and shut them down outside the lock.
101   protected void close() {
102     Collection<SolrCore> coreList = new ArrayList<>();
103 
104     // It might be possible for one of the cores to move from one list to another while we're closing them. So
105     // loop through the lists until they're all empty. In particular, the core could have moved from the transient
106     // list to the pendingCloses list.
107 
108     do {
109       coreList.clear();
110       synchronized (modifyLock) {
111         // make a copy of the cores then clear the map so the core isn't handed out to a request again
112         coreList.addAll(cores.values());
113         cores.clear();
114 
115         coreList.addAll(transientCores.values());
116         transientCores.clear();
117 
118         coreList.addAll(pendingCloses);
119         pendingCloses.clear();
120       }
121 
122       for (SolrCore core : coreList) {
123         MDCLoggingContext.setCore(core);
124         try {
125           core.close();
126         } catch (Throwable e) {
127           SolrException.log(log, "Error shutting down core", e);
128           if (e instanceof Error) {
129             throw (Error) e;
130           }
131         } finally {
132           MDCLoggingContext.clear();
133         }
134       }
135     } while (coreList.size() > 0);
136   }
137 
138   //WARNING! This should be the _only_ place you put anything into the list of transient cores!
139   protected SolrCore putTransientCore(NodeConfig cfg, String name, SolrCore core, SolrResourceLoader loader) {
140     SolrCore retCore;
141     log.info("Opening transient core {}", name);
142     synchronized (modifyLock) {
143       retCore = transientCores.put(name, core);
144     }
145     return retCore;
146   }
147 
148   protected SolrCore putCore(String name, SolrCore core) {
149     synchronized (modifyLock) {
150       return cores.put(name, core);
151     }
152   }
153 
154   List<SolrCore> getCores() {
155     List<SolrCore> lst = new ArrayList<>();
156 
157     synchronized (modifyLock) {
158       lst.addAll(cores.values());
159       return lst;
160     }
161   }
162 
163   Set<String> getCoreNames() {
164     Set<String> set = new TreeSet<>();
165 
166     synchronized (modifyLock) {
167       set.addAll(cores.keySet());
168       set.addAll(transientCores.keySet());
169     }
170     return set;
171   }
172 
173   List<String> getCoreNames(SolrCore core) {
174     List<String> lst = new ArrayList<>();
175 
176     synchronized (modifyLock) {
177       for (Map.Entry<String, SolrCore> entry : cores.entrySet()) {
178         if (core == entry.getValue()) {
179           lst.add(entry.getKey());
180         }
181       }
182       for (Map.Entry<String, SolrCore> entry : transientCores.entrySet()) {
183         if (core == entry.getValue()) {
184           lst.add(entry.getKey());
185         }
186       }
187     }
188     return lst;
189   }
190 
191   /**
192    * Gets a list of all cores, loaded and unloaded (dynamic)
193    *
194    * @return all cores names, whether loaded or unloaded.
195    */
196   public Collection<String> getAllCoreNames() {
197     Set<String> set = new TreeSet<>();
198     synchronized (modifyLock) {
199       set.addAll(cores.keySet());
200       set.addAll(transientCores.keySet());
201       set.addAll(dynamicDescriptors.keySet());
202       set.addAll(createdCores.keySet());
203     }
204     return set;
205   }
206 
207   SolrCore getCore(String name) {
208 
209     synchronized (modifyLock) {
210       return cores.get(name);
211     }
212   }
213 
214   protected void swap(String n0, String n1) {
215 
216     synchronized (modifyLock) {
217       SolrCore c0 = cores.get(n0);
218       SolrCore c1 = cores.get(n1);
219       if (c0 == null) { // Might be an unloaded transient core
220         c0 = container.getCore(n0);
221         if (c0 == null) {
222           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No such core: " + n0);
223         }
224       }
225       if (c1 == null) { // Might be an unloaded transient core
226         c1 = container.getCore(n1);
227         if (c1 == null) {
228           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No such core: " + n1);
229         }
230       }
231       cores.put(n0, c1);
232       cores.put(n1, c0);
233 
234       c0.setName(n1);
235       c1.setName(n0);
236     }
237 
238   }
239 
240   protected SolrCore remove(String name) {
241 
242     synchronized (modifyLock) {
243       SolrCore tmp = cores.remove(name);
244       SolrCore ret = null;
245       ret = (ret == null) ? tmp : ret;
246       // It could have been a newly-created core. It could have been a transient core. The newly-created cores
247       // in particular should be checked. It could have been a dynamic core.
248       tmp = transientCores.remove(name);
249       ret = (ret == null) ? tmp : ret;
250       tmp = createdCores.remove(name);
251       ret = (ret == null) ? tmp : ret;
252       dynamicDescriptors.remove(name);
253       return ret;
254     }
255   }
256 
257   /* If you don't increment the reference count, someone could close the core before you use it. */
258   SolrCore  getCoreFromAnyList(String name, boolean incRefCount) {
259     synchronized (modifyLock) {
260       SolrCore core = cores.get(name);
261 
262       if (core == null) {
263         core = transientCores.get(name);
264       }
265 
266       if (core != null && incRefCount) {
267         core.open();
268       }
269 
270       return core;
271     }
272   }
273 
274   protected CoreDescriptor getDynamicDescriptor(String name) {
275     synchronized (modifyLock) {
276       return dynamicDescriptors.get(name);
277     }
278   }
279 
280   // See SOLR-5366 for why the UNLOAD command needs to know whether a core is actually loaded or not, it might have
281   // to close the core. However, there's a race condition. If the core happens to be in the pending "to close" queue,
282   // we should NOT close it in unload core.
283   protected boolean isLoadedNotPendingClose(String name) {
284     // Just all be synchronized
285     synchronized (modifyLock) {
286       if (cores.containsKey(name)) {
287         return true;
288       }
289       if (transientCores.containsKey(name)) {
290         // Check pending
291         for (SolrCore core : pendingCloses) {
292           if (core.getName().equals(name)) {
293             return false;
294           }
295         }
296 
297         return true;
298       }
299     }
300     return false;
301   }
302 
303   protected boolean isLoaded(String name) {
304     synchronized (modifyLock) {
305       if (cores.containsKey(name)) {
306         return true;
307       }
308       if (transientCores.containsKey(name)) {
309         return true;
310       }
311     }
312     return false;
313 
314   }
315 
316   protected CoreDescriptor getUnloadedCoreDescriptor(String cname) {
317     synchronized (modifyLock) {
318       CoreDescriptor desc = dynamicDescriptors.get(cname);
319       if (desc == null) {
320         return null;
321       }
322       return new CoreDescriptor(cname, desc);
323     }
324 
325   }
326 
327   // Wait here until any pending operations (load, unload or reload) are completed on this core.
328   protected SolrCore waitAddPendingCoreOps(String name) {
329 
330     // Keep multiple threads from operating on a core at one time.
331     synchronized (modifyLock) {
332       boolean pending;
333       do { // Are we currently doing anything to this core? Loading, unloading, reloading?
334         pending = pendingCoreOps.contains(name); // wait for the core to be done being operated upon
335         if (! pending) { // Linear list, but shouldn't be too long
336           for (SolrCore core : pendingCloses) {
337             if (core.getName().equals(name)) {
338               pending = true;
339               break;
340             }
341           }
342         }
343         if (container.isShutDown()) return null; // Just stop already.
344 
345         if (pending) {
346           try {
347             modifyLock.wait();
348           } catch (InterruptedException e) {
349             return null; // Seems best not to do anything at all if the thread is interrupted
350           }
351         }
352       } while (pending);
353       // We _really_ need to do this within the synchronized block!
354       if (! container.isShutDown()) {
355         if (! pendingCoreOps.add(name)) {
356           log.warn("Replaced an entry in pendingCoreOps {}, we should not be doing this", name);
357         }
358         return getCoreFromAnyList(name, false); // we might have been _unloading_ the core, so return the core if it was loaded.
359       }
360     }
361     return null;
362   }
363 
364   // We should always be removing the first thing in the list with our name! The idea here is to NOT do anything n
365   // any core while some other operation is working on that core.
366   protected void removeFromPendingOps(String name) {
367     synchronized (modifyLock) {
368       if (! pendingCoreOps.remove(name)) {
369         log.warn("Tried to remove core {} from pendingCoreOps and it wasn't there. ", name);
370       }
371       modifyLock.notifyAll();
372     }
373   }
374 
375   protected Object getModifyLock() {
376     return modifyLock;
377   }
378 
379   // Be a little careful. We don't want to either open or close a core unless it's _not_ being opened or closed by
380   // another thread. So within this lock we'll walk along the list of pending closes until we find something NOT in
381   // the list of threads currently being loaded or reloaded. The "usual" case will probably return the very first
382   // one anyway..
383   protected SolrCore getCoreToClose() {
384     synchronized (modifyLock) {
385       for (SolrCore core : pendingCloses) {
386         if (! pendingCoreOps.contains(core.getName())) {
387           pendingCoreOps.add(core.getName());
388           pendingCloses.remove(core);
389           return core;
390         }
391       }
392     }
393     return null;
394   }
395 
396   protected void addCreated(SolrCore core) {
397     synchronized (modifyLock) {
398       createdCores.put(core.getName(), core);
399     }
400   }
401 
402   /**
403    * Return the CoreDescriptor corresponding to a given core name.
404    * Blocks if the SolrCore is still loading until it is ready.
405    * @param coreName the name of the core
406    * @return the CoreDescriptor
407    */
408   public CoreDescriptor getCoreDescriptor(String coreName) {
409     synchronized (modifyLock) {
410       if (cores.containsKey(coreName))
411         return cores.get(coreName).getCoreDescriptor();
412       if (dynamicDescriptors.containsKey(coreName))
413         return dynamicDescriptors.get(coreName);
414       return null;
415     }
416   }
417 
418   /**
419    * Get the CoreDescriptors for every SolrCore managed here
420    * @return a List of CoreDescriptors
421    */
422   public List<CoreDescriptor> getCoreDescriptors() {
423     List<CoreDescriptor> cds = Lists.newArrayList();
424     synchronized (modifyLock) {
425       for (String coreName : getAllCoreNames()) {
426         // TODO: This null check is a bit suspicious - it seems that
427         // getAllCoreNames might return deleted cores as well?
428         CoreDescriptor cd = getCoreDescriptor(coreName);
429         if (cd != null)
430           cds.add(cd);
431       }
432     }
433     return cds;
434   }
435 
436   // cores marked as loading will block on getCore
437   public void markCoreAsLoading(CoreDescriptor cd) {
438     synchronized (modifyLock) {
439       currentlyLoadingCores.add(cd.getName());
440     }
441   }
442 
443   //cores marked as loading will block on getCore
444   public void markCoreAsNotLoading(CoreDescriptor cd) {
445     synchronized (modifyLock) {
446       currentlyLoadingCores.remove(cd.getName());
447     }
448   }
449 
450   // returns when no cores are marked as loading
451   public void waitForLoadingCoresToFinish(long timeoutMs) {
452     long time = System.nanoTime();
453     long timeout = time + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS);
454     synchronized (modifyLock) {
455       while (!currentlyLoadingCores.isEmpty()) {
456         try {
457           modifyLock.wait(500);
458         } catch (InterruptedException e) {
459           Thread.currentThread().interrupt();
460         }
461         if (System.nanoTime() >= timeout) {
462           log.warn("Timed out waiting for SolrCores to finish loading.");
463           break;
464         }
465       }
466     }
467   }
468   
469   // returns when core is finished loading, throws exception if no such core loading or loaded
470   public void waitForLoadingCoreToFinish(String core, long timeoutMs) {
471     long time = System.nanoTime();
472     long timeout = time + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS);
473     synchronized (modifyLock) {
474       while (isCoreLoading(core)) {
475         try {
476           modifyLock.wait(500);
477         } catch (InterruptedException e) {
478           Thread.currentThread().interrupt();
479         }
480         if (System.nanoTime() >= timeout) {
481           log.warn("Timed out waiting for SolrCore, {},  to finish loading.", core);
482           break;
483         }
484       }
485     }
486   }
487 
488   public boolean isCoreLoading(String name) {
489     if (currentlyLoadingCores.contains(name)) {
490       return true;
491     }
492     return false;
493   }
494 }